#!/usr/bin/env python

# Copyright (c) 2013 Intel Corporation
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

# DESCRIPTION
# This script runs tests defined in meta/lib/selftest/
# It's purpose is to automate the testing of different bitbake tools.
# To use it you just need to source your build environment setup script and
# add the meta-selftest layer to your BBLAYERS.
# Call the script as: "oe-selftest" to run all the tests in in meta/lib/selftest/
# Call the script as: "oe-selftest <module>.<Class>.<method>" to run just a single test
# E.g: "oe-selftest bboutput.BitbakeLayers" will run just the BitbakeLayers class from meta/lib/selftest/bboutput.py


import os
import sys
import unittest
import logging
import argparse
import subprocess
import time as t

sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/lib')
import scriptpath
scriptpath.add_bitbake_lib_path()
scriptpath.add_oe_lib_path()
import argparse_oe

import oeqa.selftest
import oeqa.utils.ftools as ftools
from oeqa.utils.commands import runCmd, get_bb_var, get_test_layer
from oeqa.selftest.base import oeSelfTest, get_available_machines

def logger_create():
    log_file = "oe-selftest-" + t.strftime("%Y-%m-%d_%H:%M:%S") + ".log"
    if os.path.exists("oe-selftest.log"): os.remove("oe-selftest.log")
    os.symlink(log_file, "oe-selftest.log")

    log = logging.getLogger("selftest")
    log.setLevel(logging.DEBUG)

    fh = logging.FileHandler(filename=log_file, mode='w')
    fh.setLevel(logging.DEBUG)

    ch = logging.StreamHandler(sys.stdout)
    ch.setLevel(logging.INFO)

    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)

    log.addHandler(fh)
    log.addHandler(ch)

    return log

log = logger_create()

def get_args_parser():
    description = "Script that runs unit tests agains bitbake and other Yocto related tools. The goal is to validate tools functionality and metadata integrity. Refer to https://wiki.yoctoproject.org/wiki/Oe-selftest for more information."
    parser = argparse_oe.ArgumentParser(description=description)
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument('--run-tests', required=False, action='store', nargs='*', dest="run_tests", default=None, help='Select what tests to run (modules, classes or test methods). Format should be: <module>.<class>.<test_method>')
    group.add_argument('--run-all-tests', required=False, action="store_true", dest="run_all_tests", default=False, help='Run all (unhidden) tests')
    group.add_argument('--list-modules', required=False, action="store_true", dest="list_modules", default=False, help='List all available test modules.')
    group.add_argument('--list-classes', required=False, action="store_true", dest="list_allclasses", default=False, help='List all available test classes.')
    parser.add_argument('--coverage', action="store_true", help="Run code coverage when testing")
    group.add_argument('--run-tests-by', required=False, dest='run_tests_by', default=False, nargs='*',
                       help='run-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>')
    group.add_argument('--list-tests-by', required=False, dest='list_tests_by', default=False, nargs='*',
                       help='list-tests-by <name|class|module|id|tag> <list of tests|classes|modules|ids|tags>')
    group.add_argument('--list-tests', required=False,  action="store_true", dest="list_tests", default=False,
                       help='List all available tests.')
    group.add_argument('--list-tags', required=False, dest='list_tags', default=False, action="store_true",
                       help='List all tags that have been set to test cases.')
    parser.add_argument('--machine', required=False, dest='machine', choices=['random', 'all'], default=None,
                        help='Run tests on different machines (random/all).')
    return parser


def preflight_check():

    log.info("Checking that everything is in order before running the tests")

    if not os.environ.get("BUILDDIR"):
        log.error("BUILDDIR isn't set. Did you forget to source your build environment setup script?")
        return False

    builddir = os.environ.get("BUILDDIR")
    if os.getcwd() != builddir:
        log.info("Changing cwd to %s" % builddir)
        os.chdir(builddir)

    if not "meta-selftest" in get_bb_var("BBLAYERS"):
        log.error("You don't seem to have the meta-selftest layer in BBLAYERS")
        return False

    log.info("Running bitbake -p")
    runCmd("bitbake -p")

    return True

def add_include():
    builddir = os.environ.get("BUILDDIR")
    if "#include added by oe-selftest.py" \
        not in ftools.read_file(os.path.join(builddir, "conf/local.conf")):
            log.info("Adding: \"include selftest.inc\" in local.conf")
            ftools.append_file(os.path.join(builddir, "conf/local.conf"), \
                    "\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc")

    if "#include added by oe-selftest.py" \
        not in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")):
            log.info("Adding: \"include bblayers.inc\" in bblayers.conf")
            ftools.append_file(os.path.join(builddir, "conf/bblayers.conf"), \
                    "\n#include added by oe-selftest.py\ninclude bblayers.inc")

def remove_include():
    builddir = os.environ.get("BUILDDIR")
    if builddir is None:
        return
    if "#include added by oe-selftest.py" \
        in ftools.read_file(os.path.join(builddir, "conf/local.conf")):
            log.info("Removing the include from local.conf")
            ftools.remove_from_file(os.path.join(builddir, "conf/local.conf"), \
                    "\n#include added by oe-selftest.py\ninclude machine.inc\ninclude selftest.inc")

    if "#include added by oe-selftest.py" \
        in ftools.read_file(os.path.join(builddir, "conf/bblayers.conf")):
            log.info("Removing the include from bblayers.conf")
            ftools.remove_from_file(os.path.join(builddir, "conf/bblayers.conf"), \
                    "\n#include added by oe-selftest.py\ninclude bblayers.inc")

def remove_inc_files():
    try:
        os.remove(os.path.join(os.environ.get("BUILDDIR"), "conf/selftest.inc"))
        for root, _, files in os.walk(get_test_layer()):
            for f in files:
                if f == 'test_recipe.inc':
                    os.remove(os.path.join(root, f))
    except (AttributeError, OSError,) as e:    # AttributeError may happen if BUILDDIR is not set
        pass

    for incl_file in ['conf/bblayers.inc', 'conf/machine.inc']:
        try:
            os.remove(os.path.join(os.environ.get("BUILDDIR"), incl_file))
        except:
            pass

def get_tests(exclusive_modules=[], include_hidden=False):
    testslist = []
    for x in exclusive_modules:
        testslist.append('oeqa.selftest.' + x)
    if not testslist:
        for testpath in oeqa.selftest.__path__:
            files = sorted([f for f in os.listdir(testpath) if f.endswith('.py') and not (f.startswith('_') and not include_hidden) and not f.startswith('__') and f != 'base.py'])
            for f in files:
                module = 'oeqa.selftest.' + f[:-3]
                if module not in testslist:
                    testslist.append(module)

    return testslist


class Tc:
    def __init__(self, tcname, tcclass, tcmodule, tcid=None, tctag=None):
        self.tcname = tcname
        self.tcclass = tcclass
        self.tcmodule = tcmodule
        self.tcid = tcid
        # A test case can have multiple tags (as list or as tuples) otherwise str suffice
        self.tctag = tctag
        self.fullpath = '.'.join(['oeqa', 'selftest', tcmodule, tcclass, tcname])


def get_tests_from_module(tmod):
    tlist = []
    prefix = 'oeqa.selftest.'

    try:
        import importlib
        modlib = importlib.import_module(tmod)
        for mod in vars(modlib).values():
            if isinstance(mod, type(oeSelfTest)) and issubclass(mod, oeSelfTest) and mod is not oeSelfTest:
                for test in dir(mod):
                    if test.startswith('test_') and hasattr(vars(mod)[test], '__call__'):
                        # Get test case id and feature tag
                        # NOTE: if testcase decorator or feature tag not set will throw error
                        try:
                            tid = vars(mod)[test].test_case
                        except:
                            print 'DEBUG: tc id missing for ' + str(test)
                            tid = None
                        try:
                            ttag = vars(mod)[test].tag__feature
                        except:
                            # print 'DEBUG: feature tag missing for ' + str(test)
                            ttag = None

                        # NOTE: for some reason lstrip() doesn't work for mod.__module__
                        tlist.append(Tc(test, mod.__name__, mod.__module__.replace(prefix, ''), tid, ttag))
    except:
        pass

    return tlist


def get_all_tests():
    tmodules = set()
    testlist = []
    prefix = 'oeqa.selftest.'

    # Get all the test modules (except the hidden ones)
    for tpath in oeqa.selftest.__path__:
        files = sorted([f for f in os.listdir(tpath) if f.endswith('.py') and not
                        f.startswith(('_', '__')) and f != 'base.py'])
        for f in files:
            tmodules.add(prefix + f.rstrip('.py'))

    # Get all the tests from modules
    tmodules = sorted(list(tmodules))

    for tmod in tmodules:
        testlist += get_tests_from_module(tmod)

    return testlist


def create_testsuite_by(criteria, keyword):
    # Create a testsuite based on 'keyword'
    # criteria: name, class, module, id, tag
    # keyword: a list of tests, classes, modules, ids, tags
    # NOTE: globing would be nice?

    ts = set()
    all_tests = get_all_tests()

    if criteria == 'name':
        for tc in all_tests:
            if tc.tcname in keyword:
                ts.add(tc.fullpath)

    elif criteria == 'class':
        for tc in all_tests:
            if tc.tcclass in keyword:
                ts.add(tc.fullpath)

    elif criteria == 'module':
        for tc in all_tests:
            if tc.tcmodule in keyword:
                ts.add(tc.fullpath)
    elif criteria == 'id':
        for tc in all_tests:
            if str(tc.tcid) in keyword:
                ts.add(tc.fullpath)
    elif criteria == 'tag':
        for tc in all_tests:
            # tc can have multiple tags (as list or tuple) otherwise as str
            if isinstance(tc.tctag, (list, tuple)):
                for tag in tc.tctag:
                    if str(tag) in keyword:
                        ts.add(tc.fullpath)
            elif tc.tctag in keyword:
                ts.add(tc.fullpath)

    return sorted(list(ts))


def get_testsuite_by(criteria, keyword):
    # Get a testsuite based on 'keyword'
    # criteria: name, class, module, id, tag
    # keyword: a list of tests, classes, modules, ids, tags
    # NOTE: globing would be nice?
    ts = set()
    all_tests = get_all_tests()

    if criteria == 'name':
        for tc in all_tests:
            if tc.tcname in keyword:
                ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule))

    elif criteria == 'class':
        for tc in all_tests:
            if tc.tcclass in keyword:
                ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule))

    elif criteria == 'module':
        for tc in all_tests:
            if tc.tcmodule in keyword:
                ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule))
    elif criteria == 'id':
        for tc in all_tests:
            if str(tc.tcid) in keyword:
                ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule))
    elif criteria == 'tag':
        for tc in all_tests:
            # tc can have multiple tags (as list or tuple) otherwise as str
            if isinstance(tc.tctag, (list, tuple)):
                for tag in tc.tctag:
                    if str(tag) in keyword:
                        ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule))
            elif str(tc.tctag) in keyword:
                ts.add((tc.tcid, tc.tctag, tc.tcname, tc.tcclass, tc.tcmodule))

    return sorted(list(ts))


def list_testsuite_by(criteria, keyword):
    # Get a testsuite based on 'keyword'
    # criteria: name, class, module, id, tag
    # keyword: a list of tests, classes, modules, ids, tags
    # NOTE: globing would be nice?

    ts = get_testsuite_by(criteria, keyword)

    print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % ('id', 'tag', 'name', 'class', 'module')
    print '_' * 150
    for t in ts:
        if isinstance(t[1], (tuple, list)):
            print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t[0], ', '.join(t[1]), t[2], t[3], t[4])
        else:
            print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % t
    print '_' * 150
    print 'Filtering by:\t %s' % criteria
    print 'Looking for:\t %s' % ', '.join(str(x) for x in keyword)
    print 'Total found:\t %s' % len(ts)


def list_tests():
    # List all available oe-selftest tests

    ts = get_all_tests()

    print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % ('id', 'tag', 'name', 'class', 'module')
    print '_' * 150
    for t in ts:
        if isinstance(t.tctag, (tuple, list)):
            print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t.tcid, ', '.join(t.tctag), t.tcname, t.tcclass, t.tcmodule)
        else:
            print '%-4s\t%-20s\t%-60s\t%-25s\t%-20s' % (t.tcid, t.tctag, t.tcname, t.tcclass, t.tcmodule)
    print '_' * 150
    print 'Total found:\t %s' % len(ts)


def list_tags():
    # Get all tags set to test cases
    # This is useful when setting tags to test cases
    # The list of tags should be kept as minimal as possible
    tags = set()
    all_tests = get_all_tests()

    for tc in all_tests:
        if isinstance(tc.tctag, (tuple, list)):
            tags.update(set(tc.tctag))
        else:
            tags.add(tc.tctag)

    print 'Tags:\t%s' % ', '.join(str(x) for x in tags)

def coverage_setup(run_tests, run_all_tests):
    """ Set up the coverage measurement for the testcases to be run """
    builddir = os.environ.get("BUILDDIR")
    coveragerc = "%s/.coveragerc" % builddir
    data_file = "%s/.coverage." % builddir
    data_file += ((run_tests and '.'.join(run_tests)) or
        (run_all_tests and "all_tests") or "")
    if os.path.isfile(data_file):
        os.remove(data_file)
    with open(coveragerc, 'w') as cps:
        cps.write("[run]\n")
        cps.write("data_file = %s\n" % data_file)
        cps.write("branch = True\n")
        # Measure just BBLAYERS, scripts and bitbake folders
        cps.write("source = \n")
        for layer in get_bb_var('BBLAYERS').split():
            cps.write("    %s\n" % layer)
        cps.write("    %s\n" % os.path.dirname(os.path.realpath(__file__)))
        cps.write("    %s\n" % os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))),'bitbake'))

        return coveragerc

def coverage_report():
    """ Loads the coverage data gathered and reports it back """
    try:
        # Coverage4 uses coverage.Coverage
        from coverage import Coverage
    except:
        # Coverage under version 4 uses coverage.coverage
        from coverage import coverage as Coverage

    import cStringIO as StringIO
    from coverage.misc import CoverageException

    cov_output = StringIO.StringIO()
    # Creating the coverage data with the setting from the configuration file
    cov = Coverage(config_file = os.environ.get('COVERAGE_PROCESS_START'))
    try:
        # Load data from the data file specified in the configuration
        cov.load()
        # Store report data in a StringIO variable
        cov.report(file = cov_output, show_missing=False)
        log.info("\n%s" % cov_output.getvalue())
    except CoverageException as e:
        # Show problems with the reporting. Since Coverage4 not finding  any data to report raises an exception
        log.warn("%s" % str(e))
    finally:
        cov_output.close()


def main():
    parser = get_args_parser()
    args = parser.parse_args()

    # Add <layer>/lib to sys.path, so layers can add selftests
    log.info("Running bitbake -e to get BBPATH")
    bbpath = get_bb_var('BBPATH').split(':')
    layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)]
    sys.path.extend(layer_libdirs)
    reload(oeqa.selftest)

    if args.run_tests_by and len(args.run_tests_by) >= 2:
        valid_options = ['name', 'class', 'module', 'id', 'tag']
        if args.run_tests_by[0] not in valid_options:
            print '--run-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.run_tests_by[0]
            return 1
        else:
            criteria = args.run_tests_by[0]
            keyword = args.run_tests_by[1:]
            ts = create_testsuite_by(criteria, keyword)

    if args.list_tests_by and len(args.list_tests_by) >= 2:
        valid_options = ['name', 'class', 'module', 'id', 'tag']
        if args.list_tests_by[0] not in valid_options:
            print '--list-tests-by %s not a valid option. Choose one of <name|class|module|id|tag>.' % args.list_tests_by[0]
            return 1
        else:
            criteria = args.list_tests_by[0]
            keyword = args.list_tests_by[1:]
            list_testsuite_by(criteria, keyword)

    if args.list_tests:
        list_tests()

    if args.list_tags:
        list_tags()

    if args.list_allclasses:
        args.list_modules = True

    if args.list_modules:
        log.info('Listing all available test modules:')
        testslist = get_tests(include_hidden=True)
        for test in testslist:
            module = test.split('.')[-1]
            info = ''
            if module.startswith('_'):
                info = ' (hidden)'
            print module + info
            if args.list_allclasses:
                try:
                    import importlib
                    modlib = importlib.import_module(test)
                    for v in vars(modlib):
                        t = vars(modlib)[v]
                        if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest:
                            print " --", v
                            for method in dir(t):
                                if method.startswith("test_") and callable(vars(t)[method]):
                                    print " --  --", method

                except (AttributeError, ImportError) as e:
                    print e
                    pass

    if args.run_tests or args.run_all_tests or args.run_tests_by:
        if not preflight_check():
            return 1

        if args.run_tests_by:
            testslist = ts
        else:
            testslist = get_tests(exclusive_modules=(args.run_tests or []), include_hidden=False)

        suite = unittest.TestSuite()
        loader = unittest.TestLoader()
        loader.sortTestMethodsUsing = None
        runner = unittest.TextTestRunner(verbosity=2, resultclass=buildResultClass(args))
        # we need to do this here, otherwise just loading the tests
        # will take 2 minutes (bitbake -e calls)
        oeSelfTest.testlayer_path = get_test_layer()
        for test in testslist:
            log.info("Loading tests from: %s" % test)
            try:
                suite.addTests(loader.loadTestsFromName(test))
            except AttributeError as e:
                log.error("Failed to import %s" % test)
                log.error(e)
                return 1
        add_include()

        if args.machine:
            # Custom machine sets only weak default values (??=) for MACHINE in machine.inc
            # This let test cases that require a specific MACHINE to be able to override it, using (?= or =)
            log.info('Custom machine mode enabled. MACHINE set to %s' % args.machine)
            if args.machine == 'random':
                os.environ['CUSTOMMACHINE'] = 'random'
                result = runner.run(suite)
            else:  # all
                machines = get_available_machines()
                for m in machines:
                    log.info('Run tests with custom MACHINE set to: %s' % m)
                    os.environ['CUSTOMMACHINE'] = m
                    result = runner.run(suite)
        else:
            result = runner.run(suite)

        log.info("Finished")

        if result.wasSuccessful():
            return 0
        else:
            return 1

def buildResultClass(args):
    """Build a Result Class to use in the testcase execution"""

    class StampedResult(unittest.TextTestResult):
        """
        Custom TestResult that prints the time when a test starts.  As oe-selftest
        can take a long time (ie a few hours) to run, timestamps help us understand
        what tests are taking a long time to execute.
        If coverage is required, this class executes the coverage setup and reporting.
        """
        def startTest(self, test):
            import time
            self.stream.write(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " - ")
            super(StampedResult, self).startTest(test)

        def startTestRun(self):
            """ Setup coverage before running any testcase """
            if args.coverage:
                try:
                    # check if user can do coverage
                    import coverage
                    log.info("Coverage is enabled")

                    # In case the user has not set the variable COVERAGE_PROCESS_START,
                    # create a default one and export it. The COVERAGE_PROCESS_START
                    # value indicates where the coverage configuration file resides
                    # More info on https://pypi.python.org/pypi/coverage
                    if not os.environ.get('COVERAGE_PROCESS_START'):
                        os.environ['COVERAGE_PROCESS_START'] = coverage_setup(args.run_tests, args.run_all_tests)

                    self.coverage_installed = True
                except:
                    log.warn('\n'.join(["python coverage is not installed",
                        "Make sure your coverage takes into account sub-process",
                        "More info on https://pypi.python.org/pypi/coverage"]))
                    self.coverage_installed = False

        def stopTestRun(self):
            """ Report coverage data after the testcases are run """

            if args.coverage and self.coverage_installed:
                with open(os.environ['COVERAGE_PROCESS_START']) as ccf:
                    log.info("Coverage configuration file (%s)" % os.environ.get('COVERAGE_PROCESS_START'))
                    log.info("===========================")
                    log.info("\n%s" % "".join(ccf.readlines()))

                log.info("Coverage Report")
                log.info("===============")

                coverage_report()

    return StampedResult


if __name__ == "__main__":
    try:
        ret = main()
    except Exception:
        ret = 1
        import traceback
        traceback.print_exc(5)
    finally:
        remove_include()
        remove_inc_files()
    sys.exit(ret)
